-
Notifications
You must be signed in to change notification settings - Fork 551
Feature:3963 Step HeartBeat components #4073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
src/zenml/steps/heartbeat.py
Outdated
| _thread.interrupt_main() # raises KeyboardInterrupt in main thread | ||
| # Ensure we stop our own loop as well. | ||
| self._running = False | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Improve this. For sure try to capture HTTP errors in more verbose logs to avoid excessive log generation if the error is for instance server raising 500 status code.
123c1c4 to
18d8b76
Compare
|
Questions/Comments for reviewers @schustmi @bcdurak : Log info records I see that in general core components (StepLauncher, StepRunner, etc.) we display a very small number of log records. For better visibility during development I have some log records in the heartbeat worker, should these be removed? I am assuming we display few systemic logs to avoid polluting the user experience as they would be interested in their step function logs only? Some follow-up recommendations would be - a) use structured logs with context variables (https://www.structlog.org/en/stable/) to easily filter records by metadata values b) introduce a systemic logger that is configurable. Suppressed by default, when activated it would present all systemic logs. Handling of constants Currently heartbeat interval is hard set as a class variable for the StepHeartBeatWorker cls. For sure I don't want to expose this to user-provided settings as this should be a system setting (too frequent heartbeats from multiple steps may end-up overloading the rest server). I believe a good value would be somewhere in the range of 30-60 seconds. Where would you organize this value? Under Interrupt implementation I went over our signals/daemonize implementations. While that would be the proper implementation for any unix-based system it is not compatible with Windows. I opted to use _thead.interrupt_main() instead which raises a KeyboardInterrupt exception by default, capture it with a context manager that reraises it with a custom exception. Let me know your thoughts. |
18d8b76 to
1352677
Compare
src/zenml/utils/exception_utils.py
Outdated
| self._target_exception = target_exception | ||
| self._message = message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess to simplify this, we could just pass an instance of the exception here instead of the class and message? That would additionally also allow some exceptions which can/need to be instantiated with multiple arguments.
src/zenml/models/v2/core/step_run.py
Outdated
| """Light-weight model for Step Heartbeat responses.""" | ||
|
|
||
| id: UUID | ||
| status: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be of type ExecutionStatus?
| "interrupting main thread", | ||
| self.name, | ||
| ) | ||
| _thread.interrupt_main() # raises KeyboardInterrupt in main thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My dynamic pipelines PR introduces running multiple steps in different threads, which doesn't work with this I think.
Can we somehow store the thread from which the heartbeat worker was started, and then interrupt that thread instead of the main one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is an important change, good point. interrupt_main will not work here, we will need to change the pattern a bit. Should I work my changes from your branch?
| step = zen_store().get_run_step(step_run_id, hydrate=True) | ||
| pipeline_run = zen_store().get_run(step.pipeline_run_id) | ||
| verify_permission_for_model(pipeline_run, action=Action.UPDATE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether this RBAC check is even necessary, as running all of this will take quite some time (two calls to the DB, then a request to the RBAC service).
Is there any real harm in leaving this unprotected? I guess it would allow users potential access to the status of the step, which I'm not sure really is a concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we can probably do both authenication & authorization with pipeline tokens. Will discuss with @stefannica for directions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me suggest an alternative: we could limit this endpoint to only be accessed by running pipelines.
Running pipelines (the containerized environment where the steps are running actually) use something called "a workload API token" which is only valid as long as the pipeline run itself is not yet finalized. These workload API tokens are tied to a particular pipeline run (or schedule, in case of scheduled pipelines). So we can also use their scope to limit the range of targets that they can update.
Some references:
- this is the code that verifies the pipeline scoped tokens (you can see some leeway is involved): https://github.com/zenml-io/zenml/blob/main/src/zenml/zen_server/auth.py#L406-L475
- same thing for the schedule-scoped tokens: https://github.com/zenml-io/zenml/blob/main/src/zenml/zen_server/auth.py#L363-L404
A sketch of how you can use this in your endpoint:
def update_heartbeat(
step_run_id: UUID,
auth_context: AuthContext = Security(authorize),
) -> StepHeartbeatResponse:
...
if not auth_context.access_token or not auth_context.access_token.schedule_id and not auth_context.access_token.pipeline_run_id:
raise AuthorizationException("Not authorized")
if auth_context.access_token.pipeline_run_id:
# optionally, check that the step ID is part of this run ID
else: # if auth_context.access_token.schedule_id
# optionally, check that the step ID is part of a run ID that was scheduled with this schedule
This will no longer rely on RBAC calls, but it might still flood the database with a lot of requests, so maybe you could also implement a mini-caching system like the ones used in the previous code references, to reduce its impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stefannica That was my initial idea as well, but do we use those tokens also when running pipelines with service accounts? I thought at some point we used the API key directly when running scheduled pipelines, but I might be misremembering.
I know for sure though that there is a way to generate a generic unscoped token instead of a workload token when running a pipeline (by setting some token expiration env variable), so we'll have to think about how we handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi yes, even when running pipelines with service accounts, we generate workload API tokens scoped to the pipeline run or schedule. The only case where we use a generic unscoped token is if you set the ZENML_PIPELINE_API_TOKEN_EXPIRATION env variable. But this is a very obscure case, which I don't think we need to handle separately. In that case, we can just not run any RBAC like checks on this endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that this is a client-side env variable and I'm not sure we can recognize this case in the server endpoint that receives the heartbeat requests. It will just be a generic token that is not scoped to the run, and if we allow those to call the endpoint without any checks then everyone can do so, no?
- Backend heartbeat support (DB, API) - Heartbeat monitoring worker
3db4503 to
6d077e6
Compare
54b63a9 to
d416056
Compare
| ctx.access_token.schedule_id, hydrate=True | ||
| ) | ||
|
|
||
| if pipeline_run.pipeline.id != schedule.pipeline_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of this we can check pipeline_run.schedule.id == ctx.access_token.schedule_id, to make sure the run is actually triggered by the schedule that the token is scoped to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, missed that. Yes, that's better 1 DB call shorter.
| # Since interrupt_main raises KeyboardInterrupt we want in this context to capture it | ||
| # and handle it as a custom exception. | ||
|
|
||
| with ContextReraise( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question for clarification: If someone actually interrupts the python process with CTRL-C, that raises a KeyboardInterrupt in the main thread I assume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be captured the same way and a misleading exception would appear I would think. But is that a scenario we need to worry about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I personally do it quite a lot which is why I thought about it, not sure how common/important it is though. I guess it's quite easy to solve though, so maybe we can account for it?
It could just be a boolean flag on the heartbeat worker that signals that it interrupted the main thread, and we can use that flag to decide whether to re-raise with the HeartbeatInterrupt exception or keep the original KeyboardInterrupt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, that flag already exists in the heartbeat it is .is_terminated. Easy to do, will cover that as well.
| f"Initiating heartbeat for step: {self._invocation_id}" | ||
| ) | ||
|
|
||
| StepHeartbeatWorker(step_id=step_run.id).start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing you don't stop the worker here and instead rely on the server response for the thread to shut down automatically?
Wouldn't it be better to add an explicit stop once the user code finished executing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap makes sense to add it. Won't make much of a difference now but it is safer and more future-proof.
d416056 to
1d19a9f
Compare
1d19a9f to
1649b45
Compare
- Updates migration down revision refs - context-reraise exception - changes in the step-heartbeat logic - fix null heartbeat in list/get endpoints
1649b45 to
c40b9bf
Compare
Describe changes
I implemented/fixed _ to achieve _.
Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes